package org.zjvis.datascience.jobserver.common.util;

import com.alibaba.fastjson.JSONObject;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.zjvis.datascience.common.constant.URLConstant;

import javax.annotation.PostConstruct;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;

/**
 * @description 复用SparkContext工具类， 可以转发和执行spark任务
 * @date 2021-12-28
 */
@Component
public class SparkJobServerRestUtil {

    private final static Logger logger = LoggerFactory.getLogger("SparkJobServerRestUtil");

    private RestTemplate restTemplate;

    @Value("${yarn.webapp.address}")
    private String yarnWebappAddress;

    @Value("${restful.sparkJobServer.address}")
    private String sparkJobServerAddress;

    @Value("${server.port}")
    private int port;

    @PostConstruct
    public void init() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
        SSLContext sslContext = org.apache.http.ssl.SSLContexts.custom()
                .loadTrustMaterial(null, new TrustSelfSignedStrategy())
                .build();

        SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);

        CloseableHttpClient httpClient = HttpClients.custom().setSSLSocketFactory(csf).build();

        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();

        requestFactory.setHttpClient(httpClient);
        this.restTemplate = new RestTemplate(requestFactory);
    }

    /**
     * 通知dataScience，spark任务的运行结果
     *
     * @param json 命令行参数
     * @return applicationId, 为空就是提交失败
     */
    public void notifyFinalStatus(JSONObject json) {
        String url = "https://localhost:" + port + "/callback/updateJobStatus";
        HttpEntity<JSONObject> req = new HttpEntity<>(json);
        ResponseEntity<Object> entity = restTemplate.exchange(url, HttpMethod.POST, req, Object.class);
        logger.info(entity.toString());
    }

    /**
     * 根据参数查询yarn调度的app
     *
     * @param args
     * @return
     */
    public JSONObject queryApps(String args) {
        String url = yarnWebappAddress + "/ws/v1/cluster/apps?" + args;
        ResponseEntity<JSONObject> exchange = restTemplate.exchange(url, HttpMethod.GET, HttpEntity.EMPTY, JSONObject.class);
        return exchange.getBody();
    }

    /**
     * 调用jobServer接口提交任务
     *
     * @param inputString
     * @return
     */
    public String submitToJobServer(String inputString, String contextName) {
        logger.info("enter SparkJobServerRestUtil.submitToJobServer, inputString={}, contextName={}", inputString, contextName);
        //耗时Ns 可长可短 这个是执行完 才会返回结果 包括applicationId
        String url = sparkJobServerAddress + "/" + String.format(URLConstant.RUN_JOB_PARAMS, contextName);
        HttpEntity<String> req = new HttpEntity<>("input.string = " + inputString);
        ResponseEntity<String> exchange = restTemplate
                .exchange(url, HttpMethod.POST, req, new ParameterizedTypeReference<String>() {
                });
        logger.info("exit SparkJobServerRestUtil.submitToJobServer<===");
        return exchange.getBody();
    }

    /**
     * 调用jobServer接口创建context
     *
     * @param contextName
     * @return
     */
    public String createContext(String contextName) {
        String url = sparkJobServerAddress + "/" + String.format(URLConstant.CREATE_CONTEXT, contextName);
        logger.info("SparkJobServerRestUtil created a context and url={}", url);
        ResponseEntity<String> exchange = restTemplate
                .exchange(url, HttpMethod.POST,
                        HttpEntity.EMPTY, new ParameterizedTypeReference<String>() {
                        });
        return exchange.getBody();
    }

    /**
     * 停止jobServer上的context
     *
     * @param contextName
     * @return
     */
    public String deleteContext(String contextName) {
        logger.info("SparkJobServerRestUtil gonna delete a context whose name is {} and at {}", contextName, sparkJobServerAddress);
        String url = sparkJobServerAddress + "/" + String.format(URLConstant.DELETE_CONTEXT, contextName);
        ResponseEntity<String> exchange = restTemplate
                .exchange(url, HttpMethod.DELETE,
                        HttpEntity.EMPTY, new ParameterizedTypeReference<String>() {
                        });
        return exchange.getBody();
    }
}
